#include "StdAfx.h"
#include "QS_Configuration.h"
#include "ShmQuoteSubscribe.h"
#include "ShmQuoteFeed.h"
#include "MdSpi.h"
#include "FileOperations.h"
#include "QuoteProxy.h"
#include "QuoteAggregator.h"

#include <iostream>
#include <sstream>
#include <cassert>
#include <cstdlib>
#include <csignal>
#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <boost/shared_array.hpp>
#include <boost/algorithm/string.hpp>

#ifdef WIN32
#pragma comment(lib, "./ThostTraderApi/thostmduserapi.lib")
#endif

using namespace std;
using namespace boost::interprocess;

enum {
	INVALID_STARTUP_ARGUMENTS = 1,
	BEGIN_QUOTE_PROXY_ERROR = 2,
	QUOTE_AGGREGATOR_INITIALIZATION_ERROR = 3,
	QUOTE_PROXY_UNEXPECTED_EXIT = 4
};

CQSConfiguration qsConfig;
vector<QuoteProxyPtr> quoteProxyVec;

int launchChildTest(int argc, char* argv[]);
void signalHandler( int signum );
void subscribeQuoteProc(CShmQuoteSubscribeProducer * producer);
void OnQuotePush(CThostFtdcDepthMarketDataField* mktDataField);

void OnSubscribeMarketData(char** symbolArr, int symCount);
void OnUnsubscribeMarketData(char** symbolArr, int symCount);
void OnTerminateNotified();

int main(int argc, char* argv[])
{
	if(!qsConfig.Load(argc, argv))
	{
		cout << "Invalid or missing arguments" << endl;
		return INVALID_STARTUP_ARGUMENTS;
	}

	if(qsConfig.TestHost())
	{
		return launchChildTest(argc, argv);
	}

	cout << "Startup QuoteStation v1.0" << endl;
	cout << "Connection string: " << qsConfig.ConnectionString() << endl;
	cout << "BrokerId: " << qsConfig.BrokerId() << endl;
	cout << "Username: " << qsConfig.Username() << endl;
	cout << "Password: " << qsConfig.Password() << endl;

	signal(SIGINT, signalHandler);
	signal(SIGTERM, signalHandler);

	CQuoteAggregator quoteAggregator(
		boost::bind(&OnSubscribeMarketData,  _1, _2), 
		boost::bind(&OnUnsubscribeMarketData, _1, _2), 
		boost::bind(&OnTerminateNotified));

	QuoteProxyPtr quoteProxyMain(QuoteProxyPtr(new CQuoteProxy(&quoteAggregator,
		qsConfig.ConnectionString(), qsConfig.BrokerId(), qsConfig.Username(), qsConfig.Password())));
	if(!quoteProxyMain->Begin())
		return BEGIN_QUOTE_PROXY_ERROR;
	quoteProxyVec.push_back(quoteProxyMain);

	if(!quoteAggregator.Initialize(qsConfig.BrokerId(), qsConfig.Username()))
		return QUOTE_AGGREGATOR_INITIALIZATION_ERROR;

	int proxyExitCode = 0;
	// Wait until all quote proxy end successfully
	for(vector<QuoteProxyPtr>::iterator iter = quoteProxyVec.begin();
		iter != quoteProxyVec.end(); ++iter)
	{
		int errCd = (*iter)->WaitUntilEnd();
		if(errCd != 0)
			proxyExitCode = errCd;
	}

	return proxyExitCode;
}

void OnSubscribeMarketData(char** symbolArr, int symCount)
{
	for(vector<QuoteProxyPtr>::iterator iter = quoteProxyVec.begin();
		iter != quoteProxyVec.end(); ++iter)
	{
		(*iter)->SubscribeMarketData(symbolArr, symCount);
	}
}

void OnUnsubscribeMarketData(char** symbolArr, int symCount)
{
	for(vector<QuoteProxyPtr>::iterator iter = quoteProxyVec.begin();
		iter != quoteProxyVec.end(); ++iter)
	{
		(*iter)->UnsubscribeMarketData(symbolArr, symCount);
	}
}

void OnTerminateNotified()
{
	for(vector<QuoteProxyPtr>::iterator iter = quoteProxyVec.begin();
		iter != quoteProxyVec.end(); ++iter)
	{
		(*iter)->End();
	}
}

int launchChildTest(int argc, char* argv[])
{
	cout << "This is a testing host, which will launch a child for test" << endl;
	stringstream sCmd;
	assert(argc == 11);
	for (int i = 0; i < 9; ++i)
	{
		if(i > 0)
			sCmd << " ";
		sCmd << argv[i];
	}
	sCmd << ends;

	string shmName = "SubscribeQuote-" + qsConfig.BrokerId() + "-" + qsConfig.Username();
	CShmQuoteSubscribeProducer producer(shmName);
	producer.Init();

	string quoteFeedName = "QuoteFeed-" + qsConfig.BrokerId() + "-" + qsConfig.Username();
	CShmQuoteFeedConsumer feedee(quoteFeedName, boost::bind(&OnQuotePush, _1));
	feedee.Init();
	feedee.Start();

	boost::thread th(boost::bind(&subscribeQuoteProc, &producer));

	string cmd = sCmd.str();
	cout << "Launch Child : " << cmd << endl;
	int ret = system(cmd.c_str());
	cout << "Child process exit " << ret << endl;
	return ret;

}

void subscribeQuoteProc(CShmQuoteSubscribeProducer * producer)
{
	vector<string> symbols;
	symbols.push_back("IF1305");
	symbols.push_back("IF1306");

	for(int i = 0; i < 5; ++i)
	{
		boost::this_thread::sleep_for(boost::chrono::seconds(15));
		if(i < 4)
		{
			if(i % 2 == 0)
				producer->Put(symbols, true);
			else
				producer->Put(symbols, false);
		}
		else
			producer->NotifyTerminate();
	}
	cout << "Test subscribeQuoteProc done." << endl;
}

void OnQuotePush(CThostFtdcDepthMarketDataField* mktDataField)
{
	cout << "[Parent process] OnQuotePush : " << mktDataField->InstrumentID << ", "
		<< mktDataField->LastPrice << ", "
		<< mktDataField->UpdateTime << ", "
		<< mktDataField->UpdateMillisec << endl;
}

void signalHandler( int signum )
{
	cout << "signal (" << signum << ") received.\n";

	switch(signum)
	{
	case SIGINT:
		cout << "Interrupt received" << endl;
		OnTerminateNotified();
		break;
	case SIGTERM:
		cout << "Terminate received" << endl;
		OnTerminateNotified();
		break;
	}
	// cleanup and close up stuff here  
	// terminate program  

	//exit(signum);  

}